/*
 * Copyright (c) 2018, apexes.net. All rights reserved.
 *
 *         http://www.apexes.net
 *
 */
package net.apexes.wsonrpc.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author <a href=mailto:hedyn@foxmail.com>HeDYn</a>
 */
class WsonrpcClientImplConnector {

    private static final Logger LOG = LoggerFactory.getLogger(WsonrpcClientImplConnector.class);

    private static final int EVENT_CONNECT = 0;
    private static final int EVENT_PING = 1;

    private final Object synclock_lastPingEvent = new Object();
    private final Object synclock_lastReconectEvent = new Object();

    private final WsonrpcClientImpl client;
    private final PingProvider provider;
    /**
     * 心跳间隔时间，单位ms
     */
    private final int heartbeatMs;
    /**
     * 断开连接的心跳周期数，即当有设定值个心跳周期没有接收到数据时将自动断开连接。为0时表示不自动断开连接。
     */
    private final long heartbeatExpireNanos;
    /**
     * 自动重连间隔时间，单位ms
     */
    private final int reconnectMinMs;
    private final int reconnectMaxMs;
    private final int reconnectStepMs;
    private int reconnectCurrMs;

    private final DelayQueue<Event> eventQueue;
    private volatile ExecutorService executorService;
    private volatile long lastSendNanoTime;
    private volatile long lastRecvNanoTime;
    private volatile Event lastPingEvent;
    private volatile Event lastReconectEvent;

    WsonrpcClientImplConnector(WsonrpcClientImpl client, PingProvider provider,
                               int heartbeatMs, int heartbeatExpireCycle,
                               int reconnectMinMs, int reconnectMaxMs, int reconnectStepMs) {
        this.client = client;
        this.provider = provider;
        this.heartbeatMs = heartbeatMs;
        if (heartbeatExpireCycle > 0) {
            this.heartbeatExpireNanos = TimeUnit.MILLISECONDS.toNanos(heartbeatMs * (heartbeatExpireCycle + 1L));
        } else {
            this.heartbeatExpireNanos = 0;
        }
        this.reconnectMinMs = reconnectMinMs;
        this.reconnectMaxMs = reconnectMaxMs;
        this.reconnectStepMs = reconnectStepMs;
        this.reconnectCurrMs = reconnectMinMs;
        this.eventQueue = new DelayQueue<>();
    }

    synchronized void start() {
        LOG.debug("start ...");
        if (executorService != null) {
            executorService.shutdownNow();
        }
        executorService = Executors.newSingleThreadExecutor(r -> {
            Thread t = new Thread(r, "WsonrpcClientConnector");
            t.setDaemon(true);
            return t;
        });
        executorService.execute(new EventHandler());
        connectToServer();
    }

    synchronized void stop() {
        LOG.debug("stop ...");
        if (executorService != null) {
            ExecutorService exec = executorService;
            executorService = null;
            exec.shutdownNow();
        }
    }

    boolean isRunning() {
        return executorService != null;
    }

    void onConnected() {
        LOG.debug("onConnected ...");
        reconnectCurrMs = reconnectMinMs;
        lastRecvNanoTime = 0;
        startHeartbeat();
    }

    void onDisconnected() {
        LOG.debug("onDisconnected ...");
        lastRecvNanoTime = 0;
        reconnectCurrMs = startReconnect(reconnectMinMs);
    }

    void onSentMessage() {
        startHeartbeat();
    }

    void onSentPing() {
        startHeartbeat();
    }

    void onMessage() {
        lastRecvNanoTime = System.nanoTime();
    }

    void onPong() {
        lastRecvNanoTime = System.nanoTime();
    }

    private void startHeartbeat() {
        if (heartbeatMs > 0) {
            lastSendNanoTime = System.nanoTime();
            if (lastRecvNanoTime == 0) {
                lastRecvNanoTime = lastSendNanoTime;
            }
            if (heartbeatExpireNanos > 0 && lastRecvNanoTime + heartbeatExpireNanos < lastSendNanoTime) {
                LOG.debug("disconnect ...");
                client.disconnect();
                // 主动重连，防止收不到 Server 端的 Close 帧响应不触发 onOffline
                reconnectCurrMs = startReconnect(reconnectMinMs);
            } else {
                synchronized (synclock_lastPingEvent) {
                    if (lastPingEvent != null) {
                        eventQueue.remove(lastPingEvent);
                    }
                    lastPingEvent = new Event(lastSendNanoTime, heartbeatMs, EVENT_PING);
                    eventQueue.offer(lastPingEvent);
                }
            }
        }
    }

    private int startReconnect(int delayMs) {
        if (delayMs < 0) {
            return delayMs;
        }
        synchronized (synclock_lastReconectEvent) {
            if (lastReconectEvent != null) {
                return reconnectMinMs;
            }
            LOG.info("reconnect interval : {}ms", delayMs);
            lastReconectEvent = new Event(System.nanoTime(), delayMs, EVENT_CONNECT);
            eventQueue.offer(lastReconectEvent);
            if (delayMs < reconnectMaxMs) {
                delayMs += reconnectStepMs;
            }
            if (delayMs > reconnectMaxMs) {
                delayMs = reconnectMaxMs;
            }
            return delayMs;
        }
    }

    private void connectToServer() {
        if (!client.isConnected()) {
            LOG.debug("connect ...");
            try {
                client.connectToServer();
            } catch (Exception e) {
                reconnectCurrMs = startReconnect(reconnectCurrMs);
            }
        }
    }

    private void ping() {
        if (client.isConnected()) {
            LOG.debug("ping ...");
            try {
                client.ping(provider.payload());
            } catch (Exception e) {
                WsonrpcClientLogger logger = client.getWsonrpcClientLogger();
                if (logger != null) {
                    logger.onPingError(e);
                } else {
                    LOG.error("ping error!", e);
                }
            }
        }
    }

    /**
     *
     * @author <a href=mailto:hedyn@foxmail.com>HeDYn</a>
     */
    private class EventHandler implements Runnable {
        @Override
        public void run() {
            while (executorService != null) {
                try {
                    Event event = eventQueue.take();
                    if (event.eventType == EVENT_CONNECT) {
                        synchronized (synclock_lastReconectEvent) {
                            lastReconectEvent = null;
                        }
                        connectToServer();
                    } else if (event.createNanoTime == lastSendNanoTime) {
                        ping();
                    }
                } catch (InterruptedException e) {
                    if (executorService != null) {
                        LOG.error("EventHandler interrupted.", e);
                    }
                    break;
                }
            }
        }
    }

    /**
     *
     * @author <a href=mailto:hedyn@foxmail.com>HeDYn</a>
     */
    private static class Event implements Delayed {

        private final long createNanoTime;
        private final long expireNanoTime;
        private final int eventType;

        private Event(long createNanoTime, int delayMs, int eventType) {
            this.createNanoTime = createNanoTime;
            this.expireNanoTime = createNanoTime + TimeUnit.MILLISECONDS.toNanos(delayMs);
            this.eventType = eventType;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(expireNanoTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (o == this) {
                return  0;
            }
            long v;
            if (o instanceof Event) {
                Event event = (Event) o;
                v = expireNanoTime - event.expireNanoTime;
            } else {
                v = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
            }
            if (v < 0) {
                return -1;
            }
            if (v > 0) {
                return 1;
            }
            return 0;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            Event event = (Event) o;
            return createNanoTime == event.createNanoTime &&
                    expireNanoTime == event.expireNanoTime &&
                    eventType == event.eventType;
        }

        @Override
        public int hashCode() {
            return Objects.hash(createNanoTime, expireNanoTime, eventType);
        }
    }

}
